package com.yeskery.nut.util.executor;

import com.yeskery.nut.util.StringUtils;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * 任务实现类
 * @param <T> 执行器数据类型
 * @author sunjay
 * 2024/9/1
 */
public class JobImpl<T> implements Job<T> {

    /** 日志对象 */
    private static final Logger logger = Logger.getLogger(JobImpl.class.getName());

    /** 任务名称 */
    private final String name;

    /** 任务步骤集合 */
    private final LinkedList<JobStep<T>> jobSteps = new LinkedList<>();

    /** 运行状态 */
    private volatile RunStatus runStatus = RunStatus.NOT_START;

    /** 执行器上下文 */
    private ExecutorContext executorContext;

    /** 重试值 */
    private int retryValue;

    /**** 重试等待时间函数 */
    private Function<Long, Long> retryWaitTimeFunction;

    /** 任务回调方法 */
    private JobCallback<T> jobCallback;

    /** 任务步骤回调方法 */
    private JobStepCallback<T> jobStepCallback;

    /**
     * 构建任务对象
     * @param name 任务名称
     */
    public JobImpl(String name) {
        if (StringUtils.isEmpty(name)) {
            throw new ExecuteException("Job Name Cannot Be Empty.");
        }
        this.name = name;
    }
    
    @Override
    public String getName() {
        return name;
    }

    @Override
    public void addJobStepFirst(JobStep<T> jobStep) {
        jobSteps.addFirst(jobStep);
    }

    @Override
    public void addJobStepLast(JobStep<T> jobStep) {
        jobSteps.addLast(jobStep);
    }

    @Override
    public void addJobStep(int index, JobStep<T> jobStep) {
        jobSteps.add(index, jobStep);
    }

    @Override
    public int getJobStepCount() {
        return jobSteps.size();
    }

    @Override
    public RunStatus getRunStatus() {
        return runStatus;
    }

    @Override
    public void setRetryValue(int retryValue) {
        this.retryValue = retryValue;
    }

    @Override
    public Function<Long, Long> getRetryWaitTimeFunction() {
        return retryWaitTimeFunction;
    }

    @Override
    public void setRetryWaitTimeFunction(Function<Long, Long> retryWaitTimeFunction) {
        this.retryWaitTimeFunction = retryWaitTimeFunction;
    }

    @Override
    public int retry() {
        return retryValue;
    }

    @Override
    public void setJobCallback(JobCallback<T> jobCallback) {
        this.jobCallback = jobCallback;
    }

    @Override
    public void setJobStepCallback(JobStepCallback<T> jobStepCallback) {
        this.jobStepCallback = jobStepCallback;
    }

    @Override
    public void setExecutorContext(ExecutorContext executorContext) {
        this.executorContext = executorContext;
    }

    @Override
    public RunStatus run(T data) {
        runStatus = RunStatus.RUNNING;
        DefaultExecutorJobConfigure executorJobConfigure = new DefaultExecutorJobConfigure();
        executorJobConfigure.setExecutorContext(executorContext);
        executorJobConfigure.setJobContext(new JobContextImpl());
        Map<Integer, RunResult> runResultMap = new HashMap<>(jobSteps.size());
        if (jobCallback != null) {
            try {
                jobCallback.callback(this, data, runStatus, runResultMap, executorJobConfigure);
            } catch (Exception e) {
                logger.log(Level.SEVERE, "JobCallback[" + jobCallback.getClass().getName() + "] Execute Fail.", e);
            }
        }
        for (int i = 0; i < jobSteps.size(); i++) {
            JobStep<T> jobStep = jobSteps.get(i);
            int times = 0;
            int retryTimes = jobStep.retry();
            if (retryTimes == 0) {
                retryTimes = retry();
            }
            RunResult runResult;
            Long waitTime = null;
            do {
                if (times > 0) {
                    Function<Long, Long> retryWaitTimeFunction = getRetryWaitTimeFunction();
                    waitTime = retryWaitTimeFunction == null ? null : retryWaitTimeFunction.apply(waitTime);
                    if (waitTime != null && waitTime > 0) {
                        try {
                            TimeUnit.MILLISECONDS.sleep(waitTime);
                        } catch (InterruptedException e) {
                            throw new ExecuteException("Step[name=" + jobStep.getName() + ", retry=" + times + "] Execute Interrupted.", e);
                        }
                    }
                    logger.warning("Step[name=" + jobStep.getName() + "] Execute Failed, Retry for " + times + " times");
                }
                try {
                    if (jobStep.skip(data, executorJobConfigure)) {
                        runResult = RunResult.SKIPPED;
                        break;
                    }
                    runResult = jobStep.execute(data, executorJobConfigure);
                    if (runResult == RunResult.SUCCESS || runResult == RunResult.FAILURE) {
                        break;
                    }
                } catch (Exception e) {
                    logger.log(Level.WARNING, "Step[name=" + jobStep.getName() + "] Execute Failed.", e);
                    runResult = RunResult.ERROR;
                }
            } while (++times <= retryTimes);
            if (jobStepCallback != null) {
                try {
                    jobStepCallback.callback(this, jobStep, data, i, runResult, executorJobConfigure);
                } catch (Exception e) {
                    logger.log(Level.SEVERE, "JobStepCallback[" + jobCallback.getClass().getName() + "] Execute Fail.", e);
                }
            }
            runResultMap.put(i, runResult);
            if (runResult == RunResult.ERROR && jobStep.exitOnError()) {
                runStatus = RunStatus.COMPLETE;
                break;
            }
            if (runResult == RunResult.FAILURE && jobStep.exitOnError()) {
                runStatus = RunStatus.COMPLETE;
                break;
            }
        }
        if (runStatus == RunStatus.RUNNING) {
            runStatus = runResultMap.values().stream().anyMatch(r -> r == RunResult.FAILURE || r == RunResult.ERROR) ? RunStatus.COMPLETE : RunStatus.FINISH;
        }
        if (jobCallback != null) {
            try {
                jobCallback.callback(this, data, runStatus, runResultMap, executorJobConfigure);
            } catch (Exception e) {
                logger.log(Level.SEVERE, "JobCallback[" + jobCallback.getClass().getName() + "] Execute Fail.", e);
            }
        }
        return runStatus;
    }
}
